package com.shujia.spark.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Demo2UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("wc")
    val sc = new SparkContext(conf)

    //1、创建spark streaming环境
    //指定处理的间隔时间
    val ssc = new StreamingContext(sc, Durations.seconds(5))

    //设置checkpoint路径
    //用于保存状态
    ssc.checkpoint("data/checkpoint")

    //2、读取数据
    //nc -lk 8888
    //yum install nc
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)


    //统计单词的数量
    val kvDS: DStream[(String, Int)] = linesDS
      .flatMap(_.split(","))
      .map((_, 1))

    //updateStateByKey(有状态算子): 每一次计算更新每一个key的状态（单词的数量）
    val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {
      /**
       * seq： 当前批次一个key所有value
       * state: 之前的结果（状态：之前的单词的数量）
       */
      case (seq: Seq[Int], state: Option[Int]) =>
        println(seq)
        println(state)
        //计算当前批次单词的数量
        val sum: Int = seq.sum
        //获取之前单词的数量
        val count: Int = state match {
          case Some(count) => count
          case None => 0
        }
        //计算新的单词的数量并返回
        Option(sum + count)
    }
    countDS.print()

    //启动spark streaming程序
    ssc.start()
    ssc.awaitTermination()
  }
}
